/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.transforms.splittabledofn;



import com.bff.gaia.unified.sdk.io.range.ByteKey;

import com.bff.gaia.unified.sdk.io.range.ByteKeyRange;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.primitives.Bytes;



import javax.annotation.Nullable;

import java.math.BigDecimal;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.*;



/**

 * A {@link RestrictionTracker} for claiming {@link ByteKey}s in a {@link ByteKeyRange} in a

 * monotonically increasing fashion. The range is a semi-open bounded interval [startKey, endKey)

 * where the limits are both represented by {@link ByteKey#EMPTY}.

 *

 * <p>Note, one can complete a range by claiming the {@link ByteKey#EMPTY} once one runs out of keys

 * to process.

 */

public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKey>

    implements Backlogs.HasBacklog {

  /* An empty range which contains no keys. */

  @VisibleForTesting

  static final ByteKeyRange NO_KEYS = ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x00));



  private ByteKeyRange range;

  @Nullable

  private ByteKey lastClaimedKey = null;

  @Nullable

  private ByteKey lastAttemptedKey = null;



  private ByteKeyRangeTracker(ByteKeyRange range) {

    this.range = checkNotNull(range);

  }



  public static ByteKeyRangeTracker of(ByteKeyRange range) {

    return new ByteKeyRangeTracker(ByteKeyRange.of(range.getStartKey(), range.getEndKey()));

  }



  @Override

  public ByteKeyRange currentRestriction() {

    return range;

  }



  @Override

  public ByteKeyRange checkpoint() {

    // If we haven't done any work, we should return the original range we were processing

    // as the checkpoint.

    if (lastAttemptedKey == null) {

      ByteKeyRange rval = range;

      // We update our current range to an interval that contains no elements.

      range = NO_KEYS;

      return rval;

    }



    // Return an empty range if the current range is done.

    if (lastAttemptedKey.isEmpty()

        || !(range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0)) {

      return NO_KEYS;

    }



    // Otherwise we compute the "remainder" of the range from the last key.

    assert lastAttemptedKey.equals(lastClaimedKey)

        : "Expect both keys to be equal since the last key attempted was a valid key in the range.";

    ByteKey nextKey = next(lastAttemptedKey);

    ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey());

    this.range = ByteKeyRange.of(range.getStartKey(), nextKey);

    return res;

  }



  /**

   * Attempts to claim the given key.

   *

   * <p>Must be larger than the last attempted key. Since this restriction tracker represents a

   * range over a semi-open bounded interval {@code [start, end)}, the last key that was attempted

   * may have failed but still have consumed the interval {@code [lastAttemptedKey, end)} since this

   * range tracker processes keys in a monotonically increasing order. Note that passing in {@link

   * ByteKey#EMPTY} claims all keys to the end of range and can only be claimed once.

   *

   * @return {@code true} if the key was successfully claimed, {@code false} if it is outside the

   *     current {@link ByteKeyRange} of this tracker.

   */

  @Override

  public boolean tryClaim(ByteKey key) {

    // Handle claiming the end of range EMPTY key

    if (key.isEmpty()) {

      checkArgument(

          lastAttemptedKey == null || !lastAttemptedKey.isEmpty(),

          "Trying to claim key %s while last attempted key was %s",

          key,

          lastAttemptedKey);

      lastAttemptedKey = key;

      return false;

    }



    checkArgument(

        lastAttemptedKey == null || key.compareTo(lastAttemptedKey) > 0,

        "Trying to claim key %s while last attempted key was %s",

        key,

        lastAttemptedKey);

    checkArgument(

        key.compareTo(range.getStartKey()) > -1,

        "Trying to claim key %s before start of the range %s",

        key,

        range);



    lastAttemptedKey = key;

    // No respective checkArgument for i < range.to() - it's ok to try claiming keys beyond

    if (!range.getEndKey().isEmpty() && key.compareTo(range.getEndKey()) > -1) {

      return false;

    }

    lastClaimedKey = key;

    return true;

  }



  @Override

  public void checkDone() throws IllegalStateException {

    // Handle checking the empty range which is implicitly done.

    // This case can occur if the range tracker is checkpointed before any keys have been claimed

    // or if the range tracker is checkpointed once the range is done.

    if (NO_KEYS.equals(range)) {

      return;

    }



    checkState(

        lastAttemptedKey != null,

        "Key range is non-empty %s and no keys have been attempted.",

        range);



    // Return if the last attempted key was the empty key representing the end of range for

    // all ranges.

    if (lastAttemptedKey.isEmpty()) {

      return;

    }



    // If the last attempted key was not at or beyond the end of the range then throw.

    if (range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0) {

      ByteKey nextKey = next(lastAttemptedKey);

      throw new IllegalStateException(

          String.format(

              "Last attempted key was %s in range %s, claiming work in [%s, %s) was not attempted",

              lastAttemptedKey, range, nextKey, range.getEndKey()));

    }

  }



  @Override

  public String toString() {

    return MoreObjects.toStringHelper(this)

        .add("range", range)

        .add("lastClaimedKey", lastClaimedKey)

        .add("lastAttemptedKey", lastAttemptedKey)

        .toString();

  }



  // Utility methods

  /**

   * Calculates the next {@link ByteKey} for a given key by incrementing by one using byte

   * arithmetic. If the input key is empty it assumes it is a lower bound and returns the 00 byte

   * array.

   */

  @VisibleForTesting

  static ByteKey next(ByteKey key) {

    return ByteKey.copyFrom(Bytes.concat(key.getBytes(), ZERO_BYTE_ARRAY));

  }



  private static final byte[] ZERO_BYTE_ARRAY = new byte[] {0};



  @Override

  public Backlog getBacklog() {

    // Return 0 for the empty range which is implicitly done.

    // This case can occur if the range tracker is checkpointed before any keys have been claimed

    // or if the range tracker is checkpointed once the range is done.

    if (NO_KEYS.equals(range)) {

      return Backlog.of(BigDecimal.ZERO);

    }



    // If we are attempting to get the backlog without processing a single key, we return 1.0

    if (lastAttemptedKey == null) {

      return Backlog.of(BigDecimal.ONE);

    }



    // Return 0 if the last attempted key was the empty key representing the end of range for

    // all ranges or the last attempted key is beyond the end of the range.

    if (lastAttemptedKey.isEmpty()

        || !(range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0)) {

      return Backlog.of(BigDecimal.ZERO);

    }



    // TODO: Use the ability of BigDecimal's additional precision to more accurately report backlog

    // for keys which are long.

    return Backlog.of(BigDecimal.valueOf(range.estimateFractionForKey(lastAttemptedKey)));

  }

}